👋 Hello DSKC¶

Me¶

David Wagner

Machine Learning Engineer @ GenHealth.ai

Previously

  • EyeVerify / ZOLOZ
  • Mycroft
  • TripleBlind

I ❤️ Python, Jupyter, neovim, plots, seeing htop light up, finding that one character bug

What are we here for?¶

🍕¶

🗣️¶

🐻‍❄️¶

Why polars?¶

1. Blazing fast ⚡️¶

2. Beautiful API 🦋¶

1. Blazing fast ⚡️¶

DuckDB Benchmark 2023-11-03 (Previously from H2O, and way before that in 2013 from author of data.table)

image.png

Written in Rust 🦀¶

Rust doing some real magic for Python ecosystem:

New all-the-things Python linting, formatting, etc. with ruff

image.png

image.png

OpenAI's tiktoken image.png

image.png

Recently ran across nbstripout-fast which is a Rust re-write of nbstripout and it makes stripping notebook output on git hook much more transparent.

(train) β time nbstripout *.ipynb

real	0m0.299s
user	0m0.275s
sys	0m0.024s
(train) β time nbstripout-fast *.ipynb

real	0m0.007s
user	0m0.004s
sys	0m0.003s

Based on Arrow¶

Pandas 2.0 and its Ecosystem (Arrow, Polars, DuckDB)

Can get speedups in pandas just from:
1. Using parquet and arrow files
2. Using the 2.0 Arrow backend

image.png

This also gives polars very nice no-copy interop with the broader Python ecosystem.

  • to_numpy()
  • to_pandas()
  • For duckdb (docs):
df = pl.DataFrame(
    {
        "A": [1, 2, 3, 4, 5],
        "fruits": ["banana", "banana", "apple", "apple", "banana"],
        "B": [5, 4, 3, 2, 1],
        "cars": ["beetle", "audi", "beetle", "beetle", "beetle"],
    }
)
duckdb.sql('SELECT * FROM df').show()

Query Engine Optimization¶

Ritchie Vink (polars author) on polars success:

OLAP query engine: I see DataFrames as a front-end on a query engine. Polars is a fast vectorized query engine that has excellent performance and memory usage. Besides that, Polars comes with a query optimizer, meaning that users can write idiomatic code and the optimizer focuses on making it fast

2. Beautiful API 🦋¶

Ritchie Vink (polars author) on polars success:

A strict, consistent and composable API. Polars gives you the hangover up front and fails fast, making it very suitable for writing correct data pipelines.

What package to we all know and love that has a particularly messy API?¶

image.png image.png (tweet)

What visualization library has a beautiful API?¶

image.png

image.png

pandas feels closer to matplotlib than ggplot

Feels a bit like

image.png (src)

image.png (src)

image.png (src)

image.png (src)

What dataframe library has a beautiful API?

image.png

image.png (docs)

When polars?¶

  • Single node
  • Fits in memory
  • Larger than memory datasets, but not Spark
  • 4 billion rows limit before needing bigidx feature pip install polars-u64-idx
  • Has worked quite well in the low TB's in my own usage either with memory mapping, streaming, or chunking the computation.

The Basics¶

In [21]:
import polars as pl
In [23]:
df = pl.read_csv("datasets/ml-latest/ratings.csv")
df.head()
Out[23]:
shape: (5, 4)
userIdmovieIdratingtimestamp
i64i64f64i64
114.01225734739
11104.01225865086
11584.01225733503
12604.51225735204
13565.01225735119
In [26]:
df.sample(5)
Out[26]:
shape: (5, 4)
userIdmovieIdratingtimestamp
i64i64f64i64
2862481573402.51509388136
1655191644.0842971243
184349723933.51488081527
117609818453.51590390585
14804826644.01337202900
In [27]:
df.describe()
Out[27]:
shape: (9, 5)
describeuserIdmovieIdratingtimestamp
strf64f64f64f64
"count"3.3832162e73.3832162e73.3832162e73.3832162e7
"null_count"0.00.00.00.0
"mean"165437.9838928313.4834943.542541.2694e9
"std"95341.22288549928.6509281.0639592.5410e8
"min"1.01.00.57.89652004e8
"25%"82953.01219.03.01.0467e9
"50%"166129.03263.04.01.2647e9
"75%"247450.040491.04.01.4969e9
"max"330975.0288983.05.01.6898e9
In [132]:
df.write_parquet("transformed_dataset.parquet")

No index!¶

In [175]:
df.with_row_count().head()
Out[175]:
shape: (5, 5)
row_nruserIdmovieIdratingtimestamp
u32i64i64f64i64
0114.01225734739
111104.01225865086
211584.01225733503
312604.51225735204
413565.01225735119

Contexts¶

1. Selection¶

In [72]:
df.select("userId").head(1)
Out[72]:
shape: (1, 1)
userId
i64
1
In [117]:
df.with_columns(
    norm_rating=pl.col("rating") / pl.col("rating").max()
).head(1)
Out[117]:
shape: (1, 5)
userIdmovieIdratingtimestampnorm_rating
i64i64f64i64f64
114.012257347390.8

2. Filtering¶

In [118]:
df.shape
Out[118]:
(33832162, 4)
In [119]:
df.filter(pl.col("rating") == 5).shape
Out[119]:
(4957902, 4)
In [122]:
df.filter(
    (pl.col("rating") == 5) | (pl.col("rating") == 0.5)
).shape
Out[122]:
(5524208, 4)

3. Aggregation¶

In [86]:
df.group_by(pl.col("movieId")).agg(pl.col("rating").mean()).head(5)
Out[86]:
shape: (5, 2)
movieIdrating
i64f64
2674563.720459
875202.683784
2030083.0
930242.911765
2778723.5
In [124]:
(
    df.group_by(pl.col("movieId"))
    .agg(
        r_mean=pl.col("rating").mean(),
        r_min=pl.col("rating").min(),
        r_max=pl.col("rating").max()
    )
).head(5)
Out[124]:
shape: (5, 4)
movieIdr_meanr_minr_max
i64f64f64f64
2377364.04.04.0
2422323.753.04.5
2194644.1666673.55.0
2202961.8750.54.0
1677442.9130430.54.5

Expressions¶

  • A mapping from a series to a series. Series in, series out.
  • Because of this can pipe expressions together similar to method chaining piping dataframe methods together.
  • Executed in parallel

Quite a bit here!

image.png

In [145]:
pl.col("col") + 5
Out[145]:
[(col("col")) + (5)]
In [146]:
type(_)
Out[146]:
polars.expr.expr.Expr
In [153]:
[pl.col(col) + i for i, col in enumerate(["userId", "movieId"], start=1)]
Out[153]:
[<polars.expr.expr.Expr at 0x28b2b0430>,
 <polars.expr.expr.Expr at 0x28b2b37f0>]
In [154]:
df.select(_).head()
Out[154]:
shape: (5, 2)
userIdmovieId
i64i64
23
2112
2160
2262
2358
In [155]:
df.select(
    pl.col("userId") + 1,
    pl.col("movieId") + 2,
).head()
Out[155]:
shape: (5, 2)
userIdmovieId
i64i64
23
2112
2160
2262
2358
In [164]:
(
    df
    .select(
        pl.col("rating")
        .cast(pl.Float32)
        .truediv(pl.col("rating").max())
        .ge(0.5)
    )
    .group_by("rating")
    .count()
)
Out[164]:
shape: (2, 2)
ratingcount
boolu32
true29543310
false4288852

Method Chaining Syntax¶

In [125]:
df.filter(pl.col("userId") == 1).group_by("rating").count().sort("count")
Out[125]:
shape: (7, 2)
ratingcount
f64u32
2.51
2.01
4.55
3.08
3.511
5.015
4.021
In [126]:
df.head()
Out[126]:
shape: (5, 4)
userIdmovieIdratingtimestamp
i64i64f64i64
114.01225734739
11104.01225865086
11584.01225733503
12604.51225735204
13565.01225735119
In [127]:
(
    df
    .head()
)
Out[127]:
shape: (5, 4)
userIdmovieIdratingtimestamp
i64i64f64i64
114.01225734739
11104.01225865086
11584.01225733503
12604.51225735204
13565.01225735119
In [128]:
(
    df
    .filter(pl.col("userId") == 1)
    .head()
)
Out[128]:
shape: (5, 4)
userIdmovieIdratingtimestamp
i64i64f64i64
114.01225734739
11104.01225865086
11584.01225733503
12604.51225735204
13565.01225735119
In [129]:
(
    df
    .filter(pl.col("userId") == 1)
    .group_by("rating")
    .count()
)
Out[129]:
shape: (7, 2)
ratingcount
f64u32
2.51
4.55
2.01
3.511
3.08
5.015
4.021
In [130]:
(
    df
    .filter(pl.col("userId") == 1)
    .group_by("rating")
    .count()
    .sort("count")
)
Out[130]:
shape: (7, 2)
ratingcount
f64u32
2.51
2.01
4.55
3.08
3.511
5.015
4.021
In [131]:
(
    df
    .filter(pl.col("userId") == 1)
    .group_by("rating")
    .count()
    .sort("count")
).to_pandas().plot(x="rating", y="count", kind="scatter");
No description has been provided for this image

Composable Transformation Pipelines¶

In [112]:
def snake_case_ftw(df):
    return (
        df
        .rename({"movieId": "movie_id", "userId": "user_id"})
    )
In [113]:
def only_big_time_reviewers(df):
    return (
        df
        .filter(
           pl.count().over("user_id") > 1000
        )
    )
In [114]:
(
    df
    .
    .pipe(snake_case_ftw)
    .pipe(only_big_time_reviewers)
)["user_id"].n_unique()
Out[114]:
3651

Lazy Evaluation¶

1.4GB Parquet:
- 25s
- 108GB+ peak memory usage

54GB Arrow:
- 10s
- 16GB peak memory usage

+ Lazy Scan:
- 1s
- 3GB peak memory usage
df = (
    pl.scan_ipc("dataset.arrow")
    .join(cohort.lazy(), on="id", validate="m:1")
    .select("id", "date", "seq_event", "seq_event_id")
    .collect()
)

Use .explain() to inspect the optimized query plan

SQL Context¶

In [171]:
(
    pl.SQLContext(df=df)
    .execute(
        "SELECT userId, rating*2 AS double_rate FROM df"
    )
    .collect()
).head()
Out[171]:
shape: (5, 2)
userIddouble_rate
i64f64
18.0
18.0
18.0
19.0
110.0

Larger than memory dataset¶

  • Serialize the dataset as .arrow file and .read_ipc() will default to memory map the file.

  • Use .collect(streaming=True) to stream the computation. (See docs on supported ops)

  • Lazily evaluate computation in chunks on full dataset with .scan_ipc() and .slice()

Examples on ~4TB 250B rows

In [134]:
def row_count(path):
    return(
        pl.scan_ipc(path, cache=False, rechunk=False, memory_map=True)
        .select(pl.count())
        .collect()
        .item()
    )
In [135]:
def event_counts(path, n_rows, chunk_size = 5_000):
    results = []
    df = pl.scan_ipc(path, cache=False, rechunk=False, memory_map=True)
    for start in range(0, n_rows, chunk_size):
        chunk = df.slice(start, chunk_size)
        partial_result = (
            chunk
            .select(pl.col("event").alias("event_id"))
            .explode("event_id")
            .groupby("event_id")
            .agg(pl.count().cast(pl.UInt64).alias("count"))
        )
        results.append(partial_result)
    final_result = (
        pl.concat(results)
        .groupby("event_id")
        .agg(pl.sum("count").cast(pl.UInt64).alias("count"))
        .sort("count", descending=True)
        .collect()
    )
    return final_result

Tutorials¶

Coming from Pandas - https://pola-rs.github.io/polars/user-guide/migration/pandas/ Modern Polars - https://kevinheavey.github.io/modern-polars/

Thanks! 🙏¶

In [ ]: